1 /*
2 * Copyright (C) 2008 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import com.google.common.base.Preconditions;
20
21 import java.util.ArrayDeque;
22 import java.util.Queue;
23 import java.util.concurrent.Executor;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26
27 import javax.annotation.concurrent.GuardedBy;
28
29 /**
30 * Executor ensuring that all Runnables submitted are executed in order,
31 * using the provided Executor, and serially such that no two will ever
32 * be running at the same time.
33 *
34 * TODO(user): The tasks are given to the underlying executor as a single
35 * task, which means the semantics of the executor may be changed, e.g. the
36 * executor may have an afterExecute method that runs after every task
37 *
38 * TODO(user): What happens in case of shutdown or shutdownNow? Should
39 * TaskRunner check for interruption?
40 *
41 * TODO(user): It would be nice to provide a handle to individual task
42 * results using Future. Maybe SerializingExecutorService?
43 *
44 * @author JJ Furman
45 */
46 final class SerializingExecutor implements Executor {
47 private static final Logger log =
48 Logger.getLogger(SerializingExecutor.class.getName());
49
50 /** Underlying executor that all submitted Runnable objects are run on. */
51 private final Executor executor;
52
53 /** A list of Runnables to be run in order. */
54 @GuardedBy("internalLock")
55 private final Queue<Runnable> waitQueue = new ArrayDeque<Runnable>();
56
57 /**
58 * We explicitly keep track of if the TaskRunner is currently scheduled to
59 * run. If it isn't, we start it. We can't just use
60 * waitQueue.isEmpty() as a proxy because we need to ensure that only one
61 * Runnable submitted is running at a time so even if waitQueue is empty
62 * the isThreadScheduled isn't set to false until after the Runnable is
63 * finished.
64 */
65 @GuardedBy("internalLock")
66 private boolean isThreadScheduled = false;
67
68 /** The object that actually runs the Runnables submitted, reused. */
69 private final TaskRunner taskRunner = new TaskRunner();
70
71 /**
72 * Creates a SerializingExecutor, running tasks using {@code executor}.
73 *
74 * @param executor Executor in which tasks should be run. Must not be null.
75 */
76 public SerializingExecutor(Executor executor) {
77 Preconditions.checkNotNull(executor, "'executor' must not be null.");
78 this.executor = executor;
79 }
80
81 private final Object internalLock = new Object() {
82 @Override public String toString() {
83 return "SerializingExecutor lock: " + super.toString();
84 }
85 };
86
87 /**
88 * Runs the given runnable strictly after all Runnables that were submitted
89 * before it, and using the {@code executor} passed to the constructor. .
90 */
91 @Override
92 public void execute(Runnable r) {
93 Preconditions.checkNotNull(r, "'r' must not be null.");
94 boolean scheduleTaskRunner = false;
95 synchronized (internalLock) {
96 waitQueue.add(r);
97
98 if (!isThreadScheduled) {
99 isThreadScheduled = true;
100 scheduleTaskRunner = true;
101 }
102 }
103 if (scheduleTaskRunner) {
104 boolean threw = true;
105 try {
106 executor.execute(taskRunner);
107 threw = false;
108 } finally {
109 if (threw) {
110 synchronized (internalLock) {
111 // It is possible that at this point that there are still tasks in
112 // the queue, it would be nice to keep trying but the error may not
113 // be recoverable. So we update our state and propogate so that if
114 // our caller deems it recoverable we won't be stuck.
115 isThreadScheduled = false;
116 }
117 }
118 }
119 }
120 }
121
122 /**
123 * Task that actually runs the Runnables. It takes the Runnables off of the
124 * queue one by one and runs them. After it is done with all Runnables and
125 * there are no more to run, puts the SerializingExecutor in the state where
126 * isThreadScheduled = false and returns. This allows the current worker
127 * thread to return to the original pool.
128 */
129 private class TaskRunner implements Runnable {
130 @Override
131 public void run() {
132 boolean stillRunning = true;
133 try {
134 while (true) {
135 Preconditions.checkState(isThreadScheduled);
136 Runnable nextToRun;
137 synchronized (internalLock) {
138 nextToRun = waitQueue.poll();
139 if (nextToRun == null) {
140 isThreadScheduled = false;
141 stillRunning = false;
142 break;
143 }
144 }
145
146 // Always run while not holding the lock, to avoid deadlocks.
147 try {
148 nextToRun.run();
149 } catch (RuntimeException e) {
150 // Log it and keep going.
151 log.log(Level.SEVERE, "Exception while executing runnable "
152 + nextToRun, e);
153 }
154 }
155 } finally {
156 if (stillRunning) {
157 // An Error is bubbling up, we should mark ourselves as no longer
158 // running, that way if anyone tries to keep using us we won't be
159 // corrupted.
160 synchronized (internalLock) {
161 isThreadScheduled = false;
162 }
163 }
164 }
165 }
166 }
167 }